Lab 3 - Spark MLlib

"A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P if its performance at tasks in T, as measured by P, improves with experience E"

-Tom M. Mitchell

Machine Learning - the science of getting computers to act without being explicitly programmed

MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. It consists of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering (this example!), dimensionality reduction, as well as lower-level optimization primitives and higher-level pipeline APIs.

It divides into two packages:

  • spark.mllib contains the original API built on top of RDDs.
  • spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines.

Using spark.ml is recommended because with DataFrames the API is more versatile and flexible. But we will keep supporting spark.mllib along with the development of spark.ml. Users should be comfortable using spark.mllib features and expect more features coming.

http://spark.apache.org/docs/latest/mllib-guide.html

Online Purchase Recommendations

Learn how to create a recommendation engine using the Alternating Least Squares algorithm in Spark's machine learning library

The data

This is a transnational data set which contains all the transactions occurring between 01/12/2010 and 09/12/2011 for a UK-based and registered non-store online retail. The company mainly sells unique all-occasion gifts. Many customers of the company are wholesalers.

http://archive.ics.uci.edu/ml/datasets/Online+Retail

Step 1 - Create an RDD from the csv data

1.1 - Download the data


In [1]:
!rm 'OnlineRetail.csv.gz' -f
!wget https://raw.githubusercontent.com/rosswlewis/RecommendationPoT/master/OnlineRetail.csv.gz


--2016-04-01 18:47:28--  https://raw.githubusercontent.com/rosswlewis/RecommendationPoT/master/OnlineRetail.csv.gz
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 23.235.46.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|23.235.46.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7483128 (7.1M) [application/octet-stream]
Saving to: ‘OnlineRetail.csv.gz’

100%[======================================>] 7,483,128   31.1MB/s   in 0.2s   

2016-04-01 18:47:29 (31.1 MB/s) - ‘OnlineRetail.csv.gz’ saved [7483128/7483128]

1.2 - Put the csv into an RDD (at first, each row in the RDD is a string which correlates to a line in the csv

Type:
loadRetailData = sc.textFile("/resources/OnlineRetail.csv.gz")


In [ ]:

Step 2 - Prepare and shape the data: "80% of a Data Scientists job"

2.1 - Remove the header from the RDD

Type:
header = loadRetailData.first()
splitColumns = loadRetailData.filter(lambda line: line != header).map(lambda l: l.split(","))


In [ ]:

2.2 - Split the string in each row by comma

NOTE: The original file at UCI's Machine Learning Repository has commas in the product description. Those have been removed to expediate the lab.

Only keep rows that have a purchase quantity of greater than 0, a customerID not equal to 0, and a non blank stock code after romoving non-numeric characters

Type:
import re
filteredRetailData = splitColumns.filter(lambda l: int(l[3]) > 0 and len(re.sub("\D", "", l[1])) != 0 and len(l[6]) != 0)


In [ ]:

2.3 - Map each line to a row and create a data frame

Type:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

retailRows = filteredRetailData.map(lambda l: Row(inv=int(l[0]), stockCode=int(re.sub("\D", "", l[1])), description=l[2], quant=int(l[3]), invDate=l[4], price=float(l[5]), custId=int(l[6]), country=l[7]))

retailDf = sqlContext.createDataFrame(retailRows)
retailDf.registerTempTable("retailPurchases") </font>


In [ ]:

2.4 - Keep only the data we need (custId, stockCode, and rank)

Type: query = """ SELECT custId, stockCode, 1 as purch FROM retailPurchases group by custId, stockCode"""
uniqueCombDf = sqlContext.sql(query)


In [ ]:

2.5 - Randomly split the data into a testing set (10% of the data), a cross validation set (10% of the data) a training set (80% of the data)

Type:
testDf, cvDf, trainDf = uniqueCombDf.randomSplit([.1,.1,.8])


In [ ]:

Step 3 - Build recommendation models

3.1 - Use cross validation DF to train a model with Alternating Least Squares

Latent Factors / rank
The number of columns in the user-feature and product-feature matricies)
Iterations / maxIter
The number of factorization runs

Type: from pyspark.ml.recommendation import ALS

als1 = ALS(rank=15, maxIter=5, userCol="custId", itemCol="stockCode", ratingCol="purch")
model1 = als1.fit(trainDf)

als2 = ALS(rank=2, maxIter=10, userCol="custId", itemCol="stockCode", ratingCol="purch")
model2 = als2.fit(trainDf) </font>


In [ ]:

Step 4 - Test the models

Use the models to predict what the user will rate a certain item. The closer our model to 1 that our model rates an item a user has already purchased, the better.

4.1 - Evaluate the model with the cross validation dataframe by using the transorm function.

Some of the users or purchases in the cross validation data may not have been in the training data. Let's remove the ones that aren't.

Type: customers = set(trainDf.rdd.map(lambda line: line.custId).collect())
stock = set(trainDf.rdd.map(lambda line: line.stockCode).collect())

filteredCvDf = cvDf.rdd.filter(lambda line: line.stockCode in stock and line.custId in customers).toDF()

print cvDf.count()
print filteredCvDf.count() </font>


In [ ]:

Type: predictions1 = model1.transform(filteredCvDf)
predictions2 = model2.transform(filteredCvDf)


In [ ]:

4.2 - Calculate and print the Mean Squared Error. For all ratings, subtract the prediction from the actual purchase (1), square the result, and take the mean of all of the squared differences.

Type: meanSquaredError1 = predictions1.map(lambda line: (line.purch - line.prediction)**2).mean()
meanSquaredError2 = predictions2.map(lambda line: (line.purch - line.prediction)**2).mean()

print 'Mean squared error = %.4f for our first model' % meanSquaredError1
print 'Mean squared error = %.4f for our second model' % meanSquaredError2 </font>


In [ ]:

4.3 - Confirm the model by testing it with the test data and the best hyperparameters found during corss valitation

Type: filteredTestDf = testDf.rdd.filter(lambda line: line.stockCode in stock and line.custId in customers).toDF()
predictions3 = model2.transform(filteredTestDf)
meanSquaredError3 = predictions3.map(lambda line: (line.purch - line.prediction)**2).mean()

print 'Mean squared error = %.4f for our best model' % meanSquaredError3 </font>


In [ ]:

Step 5 - Implement the model

Use the best model to predict items the user will be interested in.

5.1 - First, create a dataframe in which each row has the user id and an item id.

Type: userItems = trainDf.filter(trainDf.custId == 15544).select("custId").distinct().join( trainDf.select("stockCode").distinct())


In [ ]:

5.2 - Use 'transform' to rate each item.

Type: bestRecsDf = model2.transform(userItems)


In [ ]:

5.3 - Print the top 5 recommendations.

Type: print bestRecsDf.sort("prediction",ascending=False).take(5)


In [ ]:

Let's look up this user and the recommended product ID's in the excel file...

This user seems to have purchased a lot of childrens gifts and some holiday items. The recomendation engine we created suggested some items along these lines

The ALS algorithm uses some randomness, so the recommendations yours produces may be different than these.

In [ ]:
query = """
SELECT 
    distinct description 
FROM 
    retailPurchases 
WHERE 
    stockCode in ()"
"""
items = sqlContext.sql(query)
print items.toPandas()
Data Citation

Daqing Chen, Sai Liang Sain, and Kun Guo, Data mining for the online retail industry: A case study of RFM model-based customer segmentation using data mining, Journal of Database Marketing and Customer Strategy Management, Vol. 19, No. 3, pp. 197–208, 2012 (Published online before print: 27 August 2012. doi: 10.1057/dbm.2012.17).